Skip to content

Conversation

@freeznet
Copy link
Member

(If this PR fixes a github issue, please add Fixes #<xyz>.)

Fixes #

(or if this PR is one task of a github issue, please add Master Issue: #<xyz> to link to the master issue.)

Master Issue: #

Motivation

Explain here the context, and why you're making that change. What is the problem you're trying to solve.

Modifications

Describe the modifications you've done.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

freeznet added 3 commits July 25, 2025 09:20
…istence, delayed delivery, dispatch rate, publish rate, and inactive topic policies
… including compaction, persistence, delayed delivery, dispatch rate, publish rate, and inactive topic management
@freeznet freeznet self-assigned this Jul 25, 2025
@freeznet freeznet requested review from a team as code owners July 25, 2025 01:44
@github-actions
Copy link
Contributor

@freeznet:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@github-actions github-actions bot added the doc-info-missing This pr needs to mark a document option in description label Jul 25, 2025
@labuladong labuladong requested a review from Copilot July 25, 2025 02:28
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR enhances the PulsarTopic resource by adding support for several missing topic-level policies. The changes enable users to configure advanced persistence settings, delayed delivery, rate limiting, and inactive topic management policies directly through the Kubernetes resource definition.

Key changes include:

  • Added new policy structs for PersistencePolicies, DelayedDeliveryData, DispatchRate, PublishRate, and InactiveTopicPolicies
  • Extended PulsarTopic spec to include these new policy fields
  • Implemented backend support in admin client and reconciliation logic
  • Added comprehensive test coverage for all new policies
  • Updated documentation with detailed usage examples

Reviewed Changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
api/v1alpha1/pulsartopic_types.go Added new policy type definitions and extended PulsarTopicSpec
config/crd/bases/resource.streamnative.io_pulsartopics.yaml Updated CRD schema with new policy field definitions
pkg/admin/interface.go Extended TopicParams struct with new policy fields
pkg/admin/impl.go Implemented policy application logic in admin client
pkg/connection/reconcile_topic.go Added new policy fields to topic parameter creation
tests/utils/spec.go Added utility functions for creating topics with different policy configurations
tests/operator/resources_test.go Added comprehensive integration tests for all new policies
docs/pulsar_topic.md Updated documentation with detailed policy descriptions and examples
api/v1alpha1/zz_generated.deepcopy.go Auto-generated deepcopy methods for new types
go.work.sum Updated go module checksums

}

// Handle inactive topic policies
if params.InactiveTopicPolicies != nil {
Copy link

Copilot AI Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference. The InactiveTopicDeleteMode field is a pointer and should be checked for nil before dereferencing.

Suggested change
if params.InactiveTopicPolicies != nil {
if params.InactiveTopicPolicies != nil {
if params.InactiveTopicPolicies.InactiveTopicDeleteMode == nil {
return fmt.Errorf("InactiveTopicDeleteMode is nil in InactiveTopicPolicies")
}

Copilot uses AI. Check for mistakes.
Comment on lines +373 to +375
inactiveTopicPolicies := utils.InactiveTopicPolicies{
InactiveTopicDeleteMode: &deleteMode,
MaxInactiveDurationSeconds: int(*params.InactiveTopicPolicies.MaxInactiveDurationInSeconds),
Copy link

Copilot AI Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference. The MaxInactiveDurationInSeconds field is a pointer and should be checked for nil before dereferencing.

Suggested change
inactiveTopicPolicies := utils.InactiveTopicPolicies{
InactiveTopicDeleteMode: &deleteMode,
MaxInactiveDurationSeconds: int(*params.InactiveTopicPolicies.MaxInactiveDurationInSeconds),
var maxInactiveDurationSeconds int
if params.InactiveTopicPolicies.MaxInactiveDurationInSeconds != nil {
maxInactiveDurationSeconds = int(*params.InactiveTopicPolicies.MaxInactiveDurationInSeconds)
} else {
maxInactiveDurationSeconds = 0 // Default value if nil
}
inactiveTopicPolicies := utils.InactiveTopicPolicies{
InactiveTopicDeleteMode: &deleteMode,
MaxInactiveDurationSeconds: maxInactiveDurationSeconds,

Copilot uses AI. Check for mistakes.
Comment on lines +376 to +377
DeleteWhileInactive: *params.InactiveTopicPolicies.DeleteWhileInactive,
}
Copy link

Copilot AI Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference. The DeleteWhileInactive field is a pointer and should be checked for nil before dereferencing.

Suggested change
DeleteWhileInactive: *params.InactiveTopicPolicies.DeleteWhileInactive,
}
DeleteWhileInactive: false, // Default value
}
if params.InactiveTopicPolicies.DeleteWhileInactive != nil {
inactiveTopicPolicies.DeleteWhileInactive = *params.InactiveTopicPolicies.DeleteWhileInactive
}

Copilot uses AI. Check for mistakes.
return err
}
}

Copy link

Copilot AI Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference. The BookkeeperEnsemble field is a pointer and should be checked for nil before dereferencing.

Suggested change
// Check for nil pointers in PersistencePolicies fields
if params.PersistencePolicies.BookkeeperEnsemble == nil {
return errors.New("BookkeeperEnsemble cannot be nil")
}

Copilot uses AI. Check for mistakes.

persistenceData := utils.PersistenceData{
BookkeeperEnsemble: int64(*params.PersistencePolicies.BookkeeperEnsemble),
BookkeeperWriteQuorum: int64(*params.PersistencePolicies.BookkeeperWriteQuorum),
Copy link

Copilot AI Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference. The BookkeeperWriteQuorum field is a pointer and should be checked for nil before dereferencing.

Copilot uses AI. Check for mistakes.
Comment on lines +347 to +348
dispatchRateData := utils.DispatchRateData{
DispatchThrottlingRateInMsg: int64(*params.DispatchRate.DispatchThrottlingRateInMsg),
Copy link

Copilot AI Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference. The DispatchThrottlingRateInMsg field is a pointer and should be checked for nil before dereferencing.

Suggested change
dispatchRateData := utils.DispatchRateData{
DispatchThrottlingRateInMsg: int64(*params.DispatchRate.DispatchThrottlingRateInMsg),
var dispatchThrottlingRateInMsg int64
if params.DispatchRate.DispatchThrottlingRateInMsg != nil {
dispatchThrottlingRateInMsg = int64(*params.DispatchRate.DispatchThrottlingRateInMsg)
}
dispatchRateData := utils.DispatchRateData{
DispatchThrottlingRateInMsg: dispatchThrottlingRateInMsg,

Copilot uses AI. Check for mistakes.
Comment on lines +348 to +351
DispatchThrottlingRateInMsg: int64(*params.DispatchRate.DispatchThrottlingRateInMsg),
DispatchThrottlingRateInByte: *params.DispatchRate.DispatchThrottlingRateInByte,
RatePeriodInSecond: int64(*params.DispatchRate.RatePeriodInSecond),
}
Copy link

Copilot AI Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference. The DispatchThrottlingRateInByte field is a pointer and should be checked for nil before dereferencing.

Suggested change
DispatchThrottlingRateInMsg: int64(*params.DispatchRate.DispatchThrottlingRateInMsg),
DispatchThrottlingRateInByte: *params.DispatchRate.DispatchThrottlingRateInByte,
RatePeriodInSecond: int64(*params.DispatchRate.RatePeriodInSecond),
}
DispatchThrottlingRateInMsg: int64(*params.DispatchRate.DispatchThrottlingRateInMsg),
}
if params.DispatchRate.DispatchThrottlingRateInByte != nil {
dispatchRateData.DispatchThrottlingRateInByte = *params.DispatchRate.DispatchThrottlingRateInByte
}
dispatchRateData.RatePeriodInSecond = int64(*params.DispatchRate.RatePeriodInSecond)

Copilot uses AI. Check for mistakes.
dispatchRateData := utils.DispatchRateData{
DispatchThrottlingRateInMsg: int64(*params.DispatchRate.DispatchThrottlingRateInMsg),
DispatchThrottlingRateInByte: *params.DispatchRate.DispatchThrottlingRateInByte,
RatePeriodInSecond: int64(*params.DispatchRate.RatePeriodInSecond),
Copy link

Copilot AI Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference. The RatePeriodInSecond field is a pointer and should be checked for nil before dereferencing.

Suggested change
RatePeriodInSecond: int64(*params.DispatchRate.RatePeriodInSecond),
RatePeriodInSecond: func() int64 {
if params.DispatchRate.RatePeriodInSecond != nil {
return int64(*params.DispatchRate.RatePeriodInSecond)
}
return 0 // Default value if RatePeriodInSecond is nil
}(),

Copilot uses AI. Check for mistakes.
// Handle publish rate
if params.PublishRate != nil {
publishRateData := utils.PublishRateData{
PublishThrottlingRateInMsg: int64(*params.PublishRate.PublishThrottlingRateInMsg),
Copy link

Copilot AI Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference. The PublishThrottlingRateInMsg field is a pointer and should be checked for nil before dereferencing.

Suggested change
PublishThrottlingRateInMsg: int64(*params.PublishRate.PublishThrottlingRateInMsg),
PublishThrottlingRateInMsg: func() int64 {
if params.PublishRate.PublishThrottlingRateInMsg != nil {
return int64(*params.PublishRate.PublishThrottlingRateInMsg)
}
return 0
}(),

Copilot uses AI. Check for mistakes.
Comment on lines +361 to +362
PublishThrottlingRateInMsg: int64(*params.PublishRate.PublishThrottlingRateInMsg),
PublishThrottlingRateInByte: *params.PublishRate.PublishThrottlingRateInByte,
Copy link

Copilot AI Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential nil pointer dereference. The PublishThrottlingRateInByte field is a pointer and should be checked for nil before dereferencing.

Suggested change
PublishThrottlingRateInMsg: int64(*params.PublishRate.PublishThrottlingRateInMsg),
PublishThrottlingRateInByte: *params.PublishRate.PublishThrottlingRateInByte,
PublishThrottlingRateInMsg: int64(*params.PublishRate.PublishThrottlingRateInMsg),
PublishThrottlingRateInByte: func() int64 {
if params.PublishRate.PublishThrottlingRateInByte != nil {
return *params.PublishRate.PublishThrottlingRateInByte
}
return 0
}(),

Copilot uses AI. Check for mistakes.
@freeznet freeznet merged commit 8d8c6f9 into main Jul 28, 2025
6 checks passed
@freeznet freeznet deleted the freeznet/align-topic-level-polcies branch July 28, 2025 02:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-info-missing This pr needs to mark a document option in description

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants